In this tutorial we solve the optimal control problem
$$\min J(y, u) = \frac{1}{2} \int_{\Omega} |v - v_d|^2 dx + \frac{\alpha}{2} \int_{\Omega} |u|^2 dx$$s.t. $$\begin{cases}
\end{cases}$$
where $$\begin{align*} & \Omega & \text{unit square}\\ & u \in [L^2(\Omega)]^2 & \text{control variable}\\ & v \in [H^1_0(\Omega)]^2 & \text{state velocity variable}\\ & p \in L^2(\Omega) & \text{state pressure variable}\\ & \alpha > 0 & \text{penalization parameter}\\ & v_d & \text{desired state}\\ & \nu & \text{kinematic viscosity}\\ & f & \text{forcing term} \end{align*}$$ using an adjoint formulation solved by a one shot approach
import typing
import dolfinx.fem
import dolfinx.io
import dolfinx.mesh
import mpi4py.MPI
import numpy as np
import numpy.typing
import petsc4py.PETSc
import sympy
import ufl
import viskex
import multiphenicsx.fem
mesh = dolfinx.mesh.create_unit_square(mpi4py.MPI.COMM_WORLD, 32, 32)
def bottom(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the bottom boundary."""
return abs(x[1] - 0.) < np.finfo(float).eps # type: ignore[no-any-return]
def left(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the left boundary."""
return abs(x[0] - 0.) < np.finfo(float).eps # type: ignore[no-any-return]
def top(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the top boundary."""
return abs(x[1] - 1.) < np.finfo(float).eps # type: ignore[no-any-return]
def right(x: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.bool_]:
"""Condition that defines the right boundary."""
return abs(x[0] - 1.) < np.finfo(float).eps # type: ignore[no-any-return]
boundaries_entities = dict()
boundaries_values = dict()
for (boundary, boundary_id) in zip((bottom, left, top, right), (1, 2, 3, 4)):
boundaries_entities[boundary_id] = dolfinx.mesh.locate_entities_boundary(
mesh, mesh.topology.dim - 1, boundary)
boundaries_values[boundary_id] = np.full(
boundaries_entities[boundary_id].shape, boundary_id, dtype=np.int32)
boundaries_entities_unsorted = np.hstack(list(boundaries_entities.values()))
boundaries_values_unsorted = np.hstack(list(boundaries_values.values()))
boundaries_entities_argsort = np.argsort(boundaries_entities_unsorted)
boundaries_entities_sorted = boundaries_entities_unsorted[boundaries_entities_argsort]
boundaries_values_sorted = boundaries_values_unsorted[boundaries_entities_argsort]
boundaries = dolfinx.mesh.meshtags(
mesh, mesh.topology.dim - 1,
boundaries_entities_sorted, boundaries_values_sorted)
boundaries.name = "boundaries"
boundaries_1234 = boundaries.indices[np.isin(boundaries.values, (1, 2, 3, 4))]
viskex.dolfinx.plot_mesh(mesh)
viskex.dolfinx.plot_mesh_entities(mesh, mesh.topology.dim - 1, "boundaries", boundaries_1234)
Y_velocity = dolfinx.fem.VectorFunctionSpace(mesh, ("Lagrange", 2))
Y_pressure = dolfinx.fem.FunctionSpace(mesh, ("Lagrange", 1))
U = dolfinx.fem.VectorFunctionSpace(mesh, ("Lagrange", 2))
Q_velocity = Y_velocity.clone()
Q_pressure = Y_pressure.clone()
(dv, dp) = (ufl.TrialFunction(Y_velocity), ufl.TrialFunction(Y_pressure))
(w, q) = (ufl.TestFunction(Y_velocity), ufl.TestFunction(Y_pressure))
du = ufl.TrialFunction(U)
r = ufl.TestFunction(U)
(dz, db) = (ufl.TrialFunction(Q_velocity), ufl.TrialFunction(Q_pressure))
(s, d) = (ufl.TestFunction(Q_velocity), ufl.TestFunction(Q_pressure))
(v, p) = (dolfinx.fem.Function(Y_velocity), dolfinx.fem.Function(Y_pressure))
u = dolfinx.fem.Function(U)
(z, b) = (dolfinx.fem.Function(Q_velocity), dolfinx.fem.Function(Q_pressure))
alpha = 1.e-5
epsilon = 1.e-5
x, y = sympy.symbols("x[0], x[1]")
psi_d = 10 * (1 - sympy.cos(0.8 * np.pi * x)) * (1 - sympy.cos(0.8 * np.pi * y)) * (1 - x)**2 * (1 - y)**2
v_d_x = sympy.lambdify([x, y], psi_d.diff(y, 1))
v_d_y = sympy.lambdify([x, y], - psi_d.diff(x, 1))
v_d = dolfinx.fem.Function(Y_velocity)
v_d.interpolate(lambda x: np.stack((v_d_x(x[0], x[1]), v_d_y(x[0], x[1])), axis=0))
nu = 0.01
ff = dolfinx.fem.Constant(mesh, tuple(petsc4py.PETSc.ScalarType(0) for _ in range(2)))
bc0 = np.zeros((2, ), dtype=petsc4py.PETSc.ScalarType)
F = [nu * ufl.inner(ufl.grad(z), ufl.grad(w)) * ufl.dx
+ ufl.inner(z, ufl.grad(w) * v) * ufl.dx + ufl.inner(z, ufl.grad(v) * w) * ufl.dx
- ufl.inner(b, ufl.div(w)) * ufl.dx + ufl.inner(v - v_d, w) * ufl.dx,
- ufl.inner(ufl.div(z), q) * ufl.dx + epsilon * ufl.inner(b, q) * ufl.dx,
alpha * ufl.inner(u, r) * ufl.dx - ufl.inner(z, r) * ufl.dx,
nu * ufl.inner(ufl.grad(v), ufl.grad(s)) * ufl.dx + ufl.inner(ufl.grad(v) * v, s) * ufl.dx
- ufl.inner(p, ufl.div(s)) * ufl.dx - ufl.inner(u + ff, s) * ufl.dx,
- ufl.inner(ufl.div(v), d) * ufl.dx + epsilon * ufl.inner(p, d) * ufl.dx]
dF = [[ufl.derivative(F_i, u_j, du_j) for (u_j, du_j) in zip((v, p, u, z, b), (dv, dp, du, dz, db))] for F_i in F]
dF[3][3] = dolfinx.fem.Constant(mesh, petsc4py.PETSc.ScalarType(0)) * ufl.inner(dz, s) * ufl.dx
bdofs_Y_velocity_1234 = dolfinx.fem.locate_dofs_topological(
Y_velocity, mesh.topology.dim - 1, boundaries_1234)
bdofs_Q_velocity_1234 = dolfinx.fem.locate_dofs_topological(
Q_velocity, mesh.topology.dim - 1, boundaries_1234)
bc = [dolfinx.fem.dirichletbc(bc0, bdofs_Y_velocity_1234, Y_velocity),
dolfinx.fem.dirichletbc(bc0, bdofs_Q_velocity_1234, Q_velocity)]
J = 0.5 * ufl.inner(v - v_d, v - v_d) * ufl.dx + 0.5 * alpha * ufl.inner(u, u) * ufl.dx
J_cpp = dolfinx.fem.form(J)
class NonlinearBlockProblem(object):
"""Define a nonlinear problem, interfacing with SNES."""
def __init__( # type: ignore[no-any-unimported]
self, F: typing.List[ufl.Form], dF: typing.List[typing.List[ufl.Form]],
solutions: typing.Tuple[dolfinx.fem.Function, ...], bcs: typing.List[dolfinx.fem.DirichletBCMetaClass]
) -> None:
self._F = dolfinx.fem.form(F)
self._dF = dolfinx.fem.form(dF)
self._obj_vec = dolfinx.fem.petsc.create_vector_block(self._F)
self._solutions = solutions
self._bcs = bcs
def create_snes_solution(self) -> petsc4py.PETSc.Vec: # type: ignore[no-any-unimported]
"""
Create a petsc4py.PETSc.Vec to be passed to petsc4py.PETSc.SNES.solve.
The returned vector will be initialized with the initial guesses provided in `self._solutions`,
properly stacked together in a single block vector.
"""
x = dolfinx.fem.petsc.create_vector_block(self._F)
with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(
x, [c.function_space.dofmap for c in self._solutions]) as x_wrapper:
for x_wrapper_local, component in zip(x_wrapper, self._solutions):
with component.vector.localForm() as component_local:
x_wrapper_local[:] = component_local
return x
def update_solutions(self, x: petsc4py.PETSc.Vec) -> None: # type: ignore[no-any-unimported]
"""Update `self._solutions` with data in `x`."""
x.ghostUpdate(addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(
x, [c.function_space.dofmap for c in self._solutions]) as x_wrapper:
for x_wrapper_local, component in zip(x_wrapper, self._solutions):
with component.vector.localForm() as component_local:
component_local[:] = x_wrapper_local
def obj( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec
) -> np.float64:
"""Compute the norm of the residual."""
self.F(snes, x, self._obj_vec)
return self._obj_vec.norm() # type: ignore[no-any-return]
def F( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, F_vec: petsc4py.PETSc.Vec
) -> None:
"""Assemble the residual."""
self.update_solutions(x)
with F_vec.localForm() as F_vec_local:
F_vec_local.set(0.0)
dolfinx.fem.petsc.assemble_vector_block( # type: ignore[misc]
F_vec, self._F, self._dF, self._bcs, x0=x, scale=-1.0)
def dF( # type: ignore[no-any-unimported]
self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, dF_mat: petsc4py.PETSc.Mat,
_: petsc4py.PETSc.Mat
) -> None:
"""Assemble the jacobian."""
dF_mat.zeroEntries()
dolfinx.fem.petsc.assemble_matrix_block( # type: ignore[misc]
dF_mat, self._dF, self._bcs, diagonal=1.0) # type: ignore[arg-type]
dF_mat.assemble()
# Create problem by extracting state forms from the optimality conditions
F_state = [ufl.replace(F[i], {s: w, d: q}) for i in (3, 4)]
dF_state = [[ufl.replace(dF[i][j], {s: w, d: q}) for j in (0, 1)] for i in (3, 4)]
bc_state = [bc[0]]
problem_state = NonlinearBlockProblem(F_state, dF_state, (v, p), bc_state)
F_vec_state = dolfinx.fem.petsc.create_vector_block(problem_state._F)
dF_mat_state = dolfinx.fem.petsc.create_matrix_block(problem_state._dF)
# Solve
snes = petsc4py.PETSc.SNES().create(mesh.comm)
snes.setTolerances(max_it=20)
snes.getKSP().setType("preonly")
snes.getKSP().getPC().setType("lu")
snes.getKSP().getPC().setFactorSolverType("mumps")
snes.setObjective(problem_state.obj)
snes.setFunction(problem_state.F, F_vec_state)
snes.setJacobian(problem_state.dF, J=dF_mat_state, P=None)
snes.setMonitor(lambda _, it, residual: print(it, residual))
vp = problem_state.create_snes_solution()
snes.solve(None, vp)
problem_state.update_solutions(vp) # TODO can this be safely removed?
vp.destroy()
snes.destroy()
0 0.0
<petsc4py.PETSc.SNES at 0x7f0d05310d60>
J_uncontrolled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Uncontrolled J =", J_uncontrolled)
assert np.isclose(J_uncontrolled, 0.1784536)
Uncontrolled J = 0.17845361493420545
viskex.dolfinx.plot_vector_field(v, "uncontrolled state velocity", glyph_factor=1e-1)
viskex.dolfinx.plot_scalar_field(p, "uncontrolled state pressure")
# PYTEST_XFAIL_AND_SKIP_NEXT: ufl.derivative(adjoint of the trilinear term) introduces spurious conj(trial)
"""
assert not np.issubdtype(petsc4py.PETSc.ScalarType, np.complexfloating)
""" # noqa: D
'\nassert not np.issubdtype(petsc4py.PETSc.ScalarType, np.complexfloating)\n'
"""
# Create problem associated to the optimality conditions
problem = NonlinearBlockProblem(F, dF, (v, p, u, z, b), bc)
F_vec = dolfinx.fem.petsc.create_vector_block(problem._F)
dF_mat = dolfinx.fem.petsc.create_matrix_block(problem._dF)
""" # noqa: D
'\n# Create problem associated to the optimality conditions\nproblem = NonlinearBlockProblem(F, dF, (v, p, u, z, b), bc)\nF_vec = dolfinx.fem.petsc.create_vector_block(problem._F)\ndF_mat = dolfinx.fem.petsc.create_matrix_block(problem._dF)\n'
"""
# Solve
snes = petsc4py.PETSc.SNES().create(mesh.comm)
snes.setTolerances(max_it=20)
snes.getKSP().setType("preonly")
snes.getKSP().getPC().setType("lu")
snes.getKSP().getPC().setFactorSolverType("mumps")
snes.setObjective(problem.obj)
snes.setFunction(problem.F, F_vec)
snes.setJacobian(problem.dF, J=dF_mat, P=None)
snes.setMonitor(lambda _, it, residual: print(it, residual))
vpuzb = problem.create_snes_solution()
snes.solve(None, vpuzb)
problem.update_solutions(vpuzb) # TODO can this be safely removed?
vpuzb.destroy()
snes.destroy()
""" # noqa: D
'\n# Solve\nsnes = petsc4py.PETSc.SNES().create(mesh.comm)\nsnes.setTolerances(max_it=20)\nsnes.getKSP().setType("preonly")\nsnes.getKSP().getPC().setType("lu")\nsnes.getKSP().getPC().setFactorSolverType("mumps")\nsnes.setObjective(problem.obj)\nsnes.setFunction(problem.F, F_vec)\nsnes.setJacobian(problem.dF, J=dF_mat, P=None)\nsnes.setMonitor(lambda _, it, residual: print(it, residual))\nvpuzb = problem.create_snes_solution()\nsnes.solve(None, vpuzb)\nproblem.update_solutions(vpuzb) # TODO can this be safely removed?\nvpuzb.destroy()\nsnes.destroy()\n'
"""
J_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Optimal J =", J_controlled)
assert np.isclose(J_controlled, 9.9127635e-7)
""" # noqa: D
'\nJ_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)\nprint("Optimal J =", J_controlled)\nassert np.isclose(J_controlled, 9.9127635e-7)\n'
"""
viskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-1)
""" # noqa: D
'\nviskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-1)\n'
"""
viskex.dolfinx.plot_scalar_field(p, "state pressure")
""" # noqa: D
'\nviskex.dolfinx.plot_scalar_field(p, "state pressure")\n'
"""
viskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)
""" # noqa: D
'\nviskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)\n'
"""
viskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1e4)
""" # noqa: D
'\nviskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1e4)\n'
"""
viskex.dolfinx.plot_scalar_field(b, "adjoint pressure")
""" # noqa: D
'\nviskex.dolfinx.plot_scalar_field(b, "adjoint pressure")\n'